package com.kool.kmqtt.server;

import com.kool.kmqtt.server.task.ConnectCheckTask;
import com.kool.kmqtt.server.task.KeepliveTimeoutTask;
import com.kool.kmqtt.server.task.LogScheduler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * mqtt服务端启动类
 */
@Slf4j
public class MqttServer implements Runnable {
    private final ConnectCheckTask connectCheckTask = new ConnectCheckTask();
    private final KeepliveTimeoutTask keepliveTimeoutTask = new KeepliveTimeoutTask();
    private final LogScheduler logScheduler = new LogScheduler();

    private int port;

    public MqttServer(int port) {
        this.port = port;
        log.info("初始化 kmqtt server, port: {}", port);
        log.info("启动CONNECT超时检查任务");
        new Thread(connectCheckTask).start();
        log.info("启动keeplive超时检查任务");
        new Thread(keepliveTimeoutTask).start();
        if (ServerConfig.getInstance().getLogAnalysisSwitch()) {
            log.info("启动日志推送调度任务");
            logScheduler.schedule();
        }
    }

    /**
     * 启动服务端
     */
    public void start() {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MqttServerChannelHandler(connectCheckTask));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            log.info("kmqtt server 准备就绪，开始接收连接");
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    @Override
    public void run() {
        start();
    }
}
